# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
@file graph_operator.py
Base for directionally splitted advection solvers (pure-python and GPU version).
"""
import copy
import warnings
from abc import ABCMeta, abstractmethod
from hysop import dprint
from hysop.tools.htypes import InstanceOf, to_set, check_instance, first_not_None
from hysop.tools.io_utils import IOParams
from hysop.tools.parameters import MPIParams
from hysop.parameters.parameter import Parameter
from hysop.fields.continuous_field import Field, ScalarField, TensorField
from hysop.core.graph.node_requirements import NodeRequirements
from hysop.core.graph.graph import (
wraps,
not_initialized,
initialized,
discretized,
ready,
)
from hysop.core.graph.continuous import OperatorBase
from hysop.topology.topology import Topology, TopologyView
from hysop.tools.decorators import debug
from hysop.tools.warning import HysopWarning
from hysop.topology.cartesian_descriptor import get_topo_descriptor_discretization
[docs]
def base_initialized(f):
assert callable(f)
@wraps(f)
def _check(*args, **kwds):
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if not self._base_initialized:
reason = "this self._init_base() has not been called yet."
raise RuntimeError(msg.format(reason))
return f(*args, **kwds)
return _check
[docs]
def topology_handled(f):
assert callable(f)
@wraps(f)
def _check(*args, **kwds):
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if not self.topology_handled:
reason = "this self.handle_topologies() has not been called yet."
raise RuntimeError(msg.format(reason))
return f(*args, **kwds)
return _check
[docs]
def to_be_skipped_default(*args, **kwargs):
"""Default function for skipping operator's apply"""
return False
[docs]
class ComputationalGraphNode(OperatorBase, metaclass=ABCMeta):
"""
Interface of an abstract computational graph node.
"""
@debug
def __new__(
cls,
input_fields=None,
output_fields=None,
input_params=None,
output_params=None,
input_tensor_fields=None,
output_tensor_fields=None,
name=None,
pretty_name=None,
method=None,
to_be_skipped_func=None,
**kwds,
):
return super().__new__(
cls, name=None, fields=None, tensor_fields=None, parameters=None, **kwds
)
@debug
def __init__(
self,
input_fields=None,
output_fields=None,
input_params=None,
output_params=None,
input_tensor_fields=None,
output_tensor_fields=None,
name=None,
pretty_name=None,
method=None,
to_be_skipped_func=None,
**kwds,
):
"""
Initialize a ComputationalGraphNode.
Parameters
----------
input_fields: dict, optional
input fields as a dictionnary (see Notes).
output_fields: dict, optional
output fields as a dictionnary (see Notes).
input_params: array like of hysop.parameters.Parameter or dict, optional (see Notes)
input parameters as a set or a dictionnary.
output_params: array like of hysop.parameters.Parameter or dict, optional (see Notes)
output parameters as a set or a dictionnary.
input_tensor_fields: tuple, optional
input tensor fields as a tuple.
If given, input_fields is assumed to contain only ScalarFields.
Else they are deduced from input_fields.
output_tensor_fields: tuple, optional
output tensor fields as a tuple.
If given, output_fields is assumed to contain only ScalarFields.
Else they are deduced from output_fields.
name: str, optional
name of this node (string), optional, defaults to top class name.
Default name will be the top class name (ie. self.__class__.__name__).
pretty_name: str, optional
Pretty name of this node (string), optional, defaults to name.
method: dict, optional
user method specification for this graph node, optional, defaults to None.
kwds:
arguments for base classes (mpi_params and io_params).
Attributes
----------
name: str
name of this node (used for printing and display purpose).
pretty_name: str
Pretty name of this node (used for printing and display purpose).
input_fields: dict
input fields as a dictionnary (see Notes).
output_fields: dict
output fields as a dictionnary (see Notes).
base_method: dict
base method specification for this graph node.
initialized: bool
flag set after initialize() has been called.
discretized: bool
flag set after discretize() has been called.
ready: bool
flag set after setup() has been called.
method : dict(MethodKey, MethodValue)
method, set after initialize() has been called.
input_field_requirements : dict(Field, DiscreteFieldRequirements)
input constraints, set after initialize() has been called.
output_field_requirements = {}
output constraints, set after initialize() has been called.
Notes
-----
For the input and output fields, the keys of the dicts have to be of
type :class:`hysop.fields.continuous_field.Field`.
and the values should consist of
:class:`hysop.topology.topology_descriptor.TopologyDescriptors` instances
ie. an already defined topology or a topology descriptor.
VectorFields and TensorFields are expanded to ScalarFields.
For input and output parameters, in case of dict, the key is the actual Parameter and the value must be an MPIParams, or None
Giving the following keywords as inputs (in **kwds) will throw a ValueError:
input_vars, output_vars, variables, iwork, rwork, work, backend
About the method parameter:
One can not directly use the method parameter after this call.
User method is put into attribute base_method awaiting the initialization step.
See ComputationalGraphNode.handle_method() to see how method is handled.
"""
should_init = (
(input_fields is not None)
or (output_fields is not None)
or (input_params is not None)
or (output_params is not None)
)
# Check extra args
cls = self.__class__
for _ in ("variables", "input_vars", "output_vars"):
if _ in kwds.keys():
msg = (
"The '{}' parameter should not be used in {}, use input_fields and "
)
msg += "output_fields instead."
msg = msg.format(_, cls)
raise ValueError(msg)
if ("iwork" in kwds) or ("rwork" in kwds) or ("work" in kwds):
msg = "work, rwork or iwork parameters can not be used before the full description \
of the graph in class {}.".format(
cls
)
raise ValueError(msg)
if "backend" in kwds:
msg = "{} is not a ComputationalGraphNodeFrontend thus no backend can be specified."
msg = msg.format(cls)
raise ValueError(msg)
# Expand input and output TensorFields to ScalarFields
# (a VectorField is just a TensorField so VectorFields are handled as well)
if input_tensor_fields is not None:
check_instance(input_tensor_fields, tuple, values=TensorField)
check_instance(input_fields, dict, keys=ScalarField)
for tfield in input_tensor_fields:
for field in tfield:
if field not in input_fields:
msg = "Input fields and input tensor fields mismatch."
raise RuntimeError(msg)
elif input_fields is not None:
input_tensor_fields = tuple(
filter(lambda x: x.is_tensor, input_fields.keys())
)
input_fields = {
sfield: topod
for (tfield, topod) in input_fields.items()
for sfield in tfield.fields
}
else:
input_tensor_fields = ()
if output_tensor_fields is not None:
check_instance(output_tensor_fields, tuple, values=TensorField)
check_instance(output_fields, dict, keys=ScalarField)
for tfield in output_tensor_fields:
for field in tfield:
if field not in output_fields:
msg = "Output fields and output tensor fields mismatch."
raise RuntimeError(msg)
elif output_fields is not None:
output_tensor_fields = tuple(
filter(lambda x: x.is_tensor, output_fields.keys())
)
output_fields = {
sfield: topod
for (tfield, topod) in output_fields.items()
for sfield in tfield.fields
}
else:
output_tensor_fields = ()
# Check input values
input_fields = first_not_None(input_fields, {})
output_fields = first_not_None(output_fields, {})
input_params = first_not_None(input_params, {})
output_params = first_not_None(output_params, {})
method = first_not_None(method, {})
name = first_not_None(name, self.__class__.__name__)
pretty_name = first_not_None(pretty_name, name)
if not isinstance(name, str):
msg = "name is not a string but a {}."
raise ValueError(msg.format(name.__class__))
if not isinstance(pretty_name, str):
msg = "pretty_name is not a string but a {}."
raise ValueError(msg.format(name.__class__))
if not isinstance(input_fields, dict):
msg = "input_fields is not a dict but a {}."
raise ValueError(msg.format(input_fields.__class__))
if not isinstance(output_fields, dict):
msg = "output_fields is not a dict but a {}."
raise ValueError(msg.format(output_fields.__class__))
if not isinstance(input_params, dict):
input_params = to_set(input_params)
input_params = {p: None for p in input_params}
if not isinstance(output_params, dict):
output_params = to_set(output_params)
output_params = {p: None for p in output_params}
self.name = name
self.pretty_name = pretty_name
self.input_fields = input_fields
self.output_fields = output_fields
self.input_params = input_params
self.output_params = output_params
self.input_tensor_fields = input_tensor_fields
self.output_tensor_fields = output_tensor_fields
self.base_method = method
self.initialized = False
self.topology_handled = False
self.discretized = False
self.ready = False
self.input_discrete_fields = None
self.output_discrete_fields = None
self.discrete_fields = None
self.input_discrete_tensor_fields = None
self.output_discrete_tensor_fields = None
self.discrete_tensor_fields = None
if not hasattr(self, "_field_requirements"):
self._field_requirements = None
# graph builder hints to build I/O operators.
self._input_fields_to_dump = []
self._output_fields_to_dump = []
self._input_params_to_dump = []
self._output_params_to_dump = []
self._base_initialized = False
self.__kwds = kwds
if should_init:
self._init_base(
input_fields,
output_fields,
input_tensor_fields,
output_tensor_fields,
input_params,
output_params,
)
else:
# if we are a graph we still don't know input and output variables
# => defer initialization of base class until full initialization.
from hysop.core.graph.computational_graph import ComputationalGraph
check_instance(self, ComputationalGraph)
io_params = kwds.get("io_params", False)
self.io_params = io_params
self.mpi_params = kwds.get("mpi_params", None)
self._set_io()
if to_be_skipped_func is None:
self.to_be_skipped = to_be_skipped_default
else:
self.to_be_skipped = to_be_skipped_func
def _get_is_domainless(self):
"""Return True if this node has no input nor output fields."""
return (not self.input_fields) and (not self.output_fields)
is_domainless = property(_get_is_domainless)
[docs]
@classmethod
def expand_tensor_fields(cls, fields):
scalar_fields = ()
tensor_fields = ()
for field in fields:
if field is None:
scalar_fields += (None,)
elif field.is_tensor:
scalar_fields += field.fields
tensor_fields += (field,)
else:
scalar_fields += (field,)
return (scalar_fields, tensor_fields)
@debug
def _setup_method(self, topgraph_method):
"""
Get method values from top graph method in computational graph,
and combines it to user method to build final method dictionnary.
"""
cls = type(self)
if topgraph_method:
base_method = self.base_method
avail_methods = self.available_methods()
extra_keys = (
set(topgraph_method.keys())
.intersection(avail_methods.keys())
.difference(self.base_method.keys())
)
method = self.base_method.copy()
for k in extra_keys:
method[k] = topgraph_method[k]
else:
method = self.base_method
method = self._check_method(method)
return method
@debug
def _init_base(
self,
input_fields,
output_fields,
input_tensor_fields,
output_tensor_fields,
input_params,
output_params,
is_root=False,
):
"""
Initialize base class and check everything.
"""
# Merge scalar and tensor fields
all_input_fields = tuple(input_tensor_fields)
for ofield in input_fields.keys():
if not any(ofield in tf for tf in input_tensor_fields):
all_input_fields += (ofield,)
all_output_fields = tuple(output_tensor_fields)
for ofield in output_fields.keys():
if not any(ofield in tf for tf in output_tensor_fields):
all_output_fields += (ofield,)
assert not self._base_initialized
check_instance(input_fields, dict, keys=ScalarField)
check_instance(output_fields, dict, keys=ScalarField)
check_instance(
input_params, dict, keys=Parameter, values=(MPIParams, type(None))
)
check_instance(
output_params, dict, keys=Parameter, values=(MPIParams, type(None))
)
check_instance(input_tensor_fields, tuple, values=TensorField)
check_instance(output_tensor_fields, tuple, values=TensorField)
check_instance(all_input_fields, tuple, values=Field)
check_instance(all_output_fields, tuple, values=Field)
self.input_fields = input_fields
self.output_fields = output_fields
self.input_params = input_params
self.output_params = output_params
self.input_tensor_fields = input_tensor_fields
self.output_tensor_fields = output_tensor_fields
ifields = set(self.input_fields.keys())
ofields = set(self.output_fields.keys())
fields = tuple(ifields.union(ofields))
itfields = set(self.input_tensor_fields)
otfields = set(self.output_tensor_fields)
tfields = tuple(itfields.union(otfields))
iparams = set(self.input_params.keys())
oparams = set(self.output_params.keys())
parameters = tuple(iparams.union(oparams))
if ("mpi_params" in self.__kwds) and (
"ComputationalGraph"
not in map(lambda c: c.__name__, self.__class__.__mro__)
):
mpi_params = self.__kwds["mpi_params"]
for topo in set(self.input_fields.values()).union(
self.output_fields.values()
):
if isinstance(topo, Topology) and (topo.mpi_params != mpi_params):
d = topo.mpi_params.diff(mpi_params)
msg = "MPI parameters mismatch between already specified topology mpi_params "
msg += f"and operator MPI paramaters in operator {self.name}."
msg += f"\n *operator: {mpi_params}"
msg += f"\n *field: {topo.mpi_params}"
msg += f"\n >diff : {d}\n"
if not (
"task_id" in d.keys()
and "comm" in d.keys()
and not "on_task" in d.keys()
):
raise RuntimeError(msg)
super().__init__(
name=self.name,
fields=fields,
tensor_fields=tfields,
parameters=parameters,
**self.__kwds,
)
# Consolidate unkwnown mpi_params for parameters.
for p in iparams:
if self.input_params[p] is None:
self.input_params[p] = self.mpi_params
for p in oparams:
if self.output_params[p] is None:
self.output_params[p] = self.mpi_params
# after consolidation : None value not allowed anymore
check_instance(self.input_params, dict, keys=Parameter, values=MPIParams)
check_instance(self.output_params, dict, keys=Parameter, values=MPIParams)
self._base_initialized = True
self.all_input_fields = all_input_fields
self.all_output_fields = all_output_fields
@debug
def _check_method(self, user_method):
"""
Update user method with default method and check
againt available methods.
"""
method = self.default_method().copy()
if user_method is not None:
method.update(user_method)
available_methods = self.available_methods()
for k, v in method.items():
if k not in available_methods.keys():
msg = "{} is not an available method key for computational node {}."
msg = msg.format(k, self.name)
warnings.warn(msg, HysopWarning)
continue
available = to_set(available_methods[k])
instances = {x for x in available if isinstance(x, InstanceOf)}
available = available.difference(instances)
good = False
for instance in instances:
if instance.match_instance(v):
good = True
break
good = good or (v in available)
if not good:
msg = f"{v} is not an available method value for key {k.__name__},"
msg += f"\n possible values are {available_methods[k]}."
raise ValueError(msg)
return method
[docs]
@debug
@base_initialized
def check(self):
"""
Check if node was correctly initialized.
By default this checks variables, topologies and support.
"""
self._check_variables()
self._check_topologies()
self._check_support()
@debug
@base_initialized
def _check_variables(self):
"""
Check input and output variables.
Called automatically in ComputationalGraphNode.check()
"""
for variables in [self.input_fields, self.output_fields]:
for k, v in variables.items():
if not isinstance(k, Field):
msg = "Given key is not a continuous Field (got a {})."
raise TypeError(msg.format(k.__class__))
if not isinstance(v, TopologyView):
msg = f"Expected a Topology instance but got a {v.__class__}."
msg += "\nAll topologies are expected to be set after "
msg += (
"ComputationalGraph.get_field_requirements() has been called."
)
raise TypeError(msg)
@debug
@base_initialized
def _check_topologies(self):
"""
Sets topologies flags.
_is_distributed
_has_multiple_topologies
_has_multiple_field_topologies
Sets the following attributes:
_multi_topo_fields (list of field that have at least two different topologies)
Called automatically in ComputationalGraphNode.check()
"""
is_distributed = self.mpi_params.size > 1
has_multiple_topologies = False
has_multiple_field_topologies = False
multi_topo_fields = set()
topos = tuple(self.input_fields.values()) + tuple(self.output_fields.values())
if topos:
topo_ref = first_not_None(topos).topology
for variables in [self.input_fields, self.output_fields]:
for field, topo in variables.items():
if topo is not None and (topo.topology != topo_ref):
has_multiple_topologies = True
from hysop.core.mpi.redistribute import RedistributeInter
if isinstance(self, RedistributeInter):
for field in set(self.input_fields.keys()).union(
set(self.output_fields.keys())
):
multi_topo_fields.add(field)
has_multiple_field_topologies = True
else:
for ifield in self.input_fields:
if (
ifield in self.output_fields
and self.input_fields[ifield].topology
!= self.output_fields[ifield].topology
):
multi_topo_fields.add(ifield)
has_multiple_field_topologies = True
self._is_distributed = is_distributed
self._has_multiple_topologies = has_multiple_topologies
self._has_multiple_field_topologies = has_multiple_field_topologies
self._multi_topo_fields = multi_topo_fields
@debug
@base_initialized
def _check_support(self):
"""
Check input and output variables topologies against the supported topologies of
this node.
See ComputationalGraphNode.supports_multiple_topologies()
ComputationalGraphNode.supports_multiple_field_topologies()
ComputationalGraphNode.supports_mpi()
Called automatically in ComputationalGraphNode.check()
"""
cls = self.__class__
if (self._has_multiple_field_topologies) and (
not cls.supports_multiple_field_topologies()
):
msg = "Graph operator '{}' does not support multiple topologies yet."
msg += "\nTopology mismatch for continuous variable(s) {} between "
msg += "input and output variables."
msg = msg.format(self.name, [f.name for f in self._multi_topo_fields])
raise NotImplementedError(msg)
if (self._has_multiple_topologies) and (not cls.supports_multiple_topologies()):
msg = "Graph operator {} does not support multiple field topologies yet."
msg = msg.format(self.node_tag)
msg += "\n>Input topologies:"
for field, topo in self.input_fields.items():
msg += f"\n *{field.short_description()} -> {topo.short_description()}"
msg += "\n>Output topologies:"
for field, topo in self.output_fields.items():
msg += f"\n *{field.short_description()} -> {topo.short_description()}"
raise NotImplementedError(msg)
if (self._is_distributed) and (not cls.supports_mpi()):
msg = "\nMPI multi-process has not been implemented in graph operator '{}' yet!\n"
msg = msg.format(type(self))
raise NotImplementedError(msg)
# ComputationalGraphNode interface
[docs]
@base_initialized
def get_topologies(self):
"""
Returns all the topologies used in this operator.
Topologies are organized by backend in a dictionnary.
"""
topologies = {}
for topo in set(self.input_fields.values()).union(self.output_fields.values()):
if topo is not None:
topologies.setdefault(topo.backend, set()).add(topo)
return topologies
[docs]
def get_domains(self):
"""
Returns all the domains used in this operator.
Domains are keys and values are operators that have variables
defined on this domain.
If this node has no domain (ie. no input or output variables),
if fills the 'None' domain.
"""
domains = {}
for field in set(self.input_fields.keys()).union(self.output_fields.keys()):
domains.setdefault(field.domain, set()).add(self)
if self.is_domainless:
domains.setdefault(None, set()).add(self)
return domains
[docs]
@base_initialized
def get_backends(self):
"""
Returns all the backends used in this operator as a set.
"""
return self.get_topologies().keys()
[docs]
@abstractmethod
def available_methods(self):
"""
Returns the available methods of this node.
This should return a dictionary of method as keys
and possible values as a scalar or an iterable.
See hysop.types.InstanceOf to support specific
class types. This is used to check user method input.
"""
pass
[docs]
@abstractmethod
def default_method(self):
"""
Returns the default method of this node.
Default methods should be compatible with available_methods.
If the user provided method dictionnaty misses some method keys,
a default value for this key will be extracted from the default one.
"""
pass
[docs]
@debug
def handle_method(self, method):
"""
Method automatically called during initialization.
This allow to extract method values after method preprocessing.
Method preprocessing means:
1) complete user input with compatible top graph user inputs
2) complete the resulting dictionnary with the node default_method
3) check method against available_methods.
The result of this process is fed as argument of this function.
"""
self.method = {k: v for (k, v) in method.items()}
[docs]
@abstractmethod
@debug
def get_field_requirements(self):
"""
Called just after handle_method(), ie self.method has been set.
Topology requirements are:
1) min and max ghosts for each input and output variables
2) allowed splitting directions for cartesian topologies
3) required local and global transposition state, if any.
and more
They are stored in self.input_field_requirements and
self.output_field_requirements.
Keys are continuous fields and values are of type
hysop.fields.field_requirement.DiscreteFieldRequirements
"""
from hysop.fields.field_requirements import OperatorFieldRequirements
return OperatorFieldRequirements()
[docs]
@debug
def get_node_requirements(self):
"""Called after get_field_requirements to get global node requirements."""
return NodeRequirements(self)
[docs]
@debug
def get_and_set_field_requirements(self):
"""
Calls get_field_requirements() on current node and set
self.field_requirements.
"""
field_requirements = self.get_field_requirements()
assert field_requirements is not None
self._field_requirements = field_requirements
node_requirements = self.get_node_requirements()
assert isinstance(node_requirements, NodeRequirements)
self._node_requirements = field_requirements
node_requirements.check_and_update_reqs(field_requirements)
return field_requirements
[docs]
def get_output_field_requirements(self):
"""
Returns output field requirements for this node.
"""
freqs = self._field_requirements
if freqs is None:
msg = "{}.get_and_set_field_requirements() has not been called yet "
msg += "on node {}."
msg = msg.format(type(self).__name__, self.name)
raise RuntimeError(msg)
return freqs.output_field_requirements
input_field_requirements = property(get_input_field_requirements)
output_field_requirements = property(get_output_field_requirements)
[docs]
@debug
def handle_topologies(self, input_topology_states, output_topology_states):
"""
Called after all topologies have been set up.
Topologies are available as values of self.input_fields
and self.output_fields and are mapped by continuous Field.
In addition input_topology_states are passed as argument
contains input discrete topology states and output topology states
that the graph builder determined. All input states and
output topology states have to comply with the operator field
requirements obtained with self.get_field_requirements().
"""
from hysop.topology.topology import TopologyState
check_instance(input_topology_states, dict, keys=Field, values=TopologyState)
check_instance(output_topology_states, dict, keys=Field, values=TopologyState)
self.topology_handled = True
[docs]
@classmethod
def get_topo_descriptor(cls, variables, field):
if field in variables:
return variables[field]
tfields = tuple(filter(lambda x: x.is_tensor, variables.keys()))
for tfield in tfields:
if field in tfield:
return variables[tfield]
msg = "Could not find any topology descriptor corresponding to field {}."
msg = msg.format(field.short_description())
raise KeyError(msg)
[docs]
@classmethod
def get_topo_discretization(cls, variables, field):
topo = cls.get_topo_descriptor(variables=variables, field=field)
return get_topo_descriptor_discretization(topo)
[docs]
@classmethod
def supports_multiple_topologies(cls):
"""
Should return True if this node supports multiple topologies.
"""
return True
[docs]
@classmethod
def supports_multiple_field_topologies(cls):
"""
Should return True if an input field that is also an output field
can have an input topology different from its output topology.
This is usefull in Redistribute like operators.
If this returns True this implies supports_multiple_topologies().
It also implies that self.variables[field] may return a set of topologies.
In this case one can recover input and output topologies by using
self.input_fields[field] and self.output_fields[field].
In addition one can find such fields by using the list self.multi_topo_fields
which is set after ComputationalGraphNode.initialize() has been called.
"""
return False
[docs]
@classmethod
def supports_mpi(cls):
"""
Return True if this operator was implemented to support multiple mpi processes.
"""
return False
[docs]
@debug
def pre_initialize(self, **kwds):
"""
Function called before initialization,
can be used to alter variables set in __init__ like
input_fields or output_fields.
By default this does nothing.
"""
pass
[docs]
@debug
def post_initialize(self, **kwds):
"""
Function called after initialization,
can be used to execute routines after handle_method has been called.
By default this does nothing.
"""
pass
[docs]
@debug
def initialize(self, topgraph_method=None, **kwds):
"""
Initialize this node.
Initialization step sets the following variables:
*self.method,
*self.input_field_requirements
*self.output_field_requirements
*self.initialized
It returns self.method.
Order of execution is:
self.pre_initialize()
self._setup_method()
self.handle_method()
self.get_field_requirements()
self._initialized = True
self.post_initialize()
See ComputationalGraphNode.handle_method() to see how user method is handled.
See ComputationalGraphNode.get_field_requirements() to see how topology requirements
are handled.
After this method has been handled by all operators, initialization collects min and max
ghosts required by each operators which will be usefull in the discretiezation step
to automatically build topologies or check against user supplied topologies.
This function also sets the self.initialized flag to True (just before post
initialization).
Once this flag is set one may call ComputationalGraphNode.discretize().
"""
if self.initialized:
return
method = self._setup_method(topgraph_method)
self.handle_method(method)
self.initialized = True
return method
[docs]
@debug
@topology_handled
def discretize(self):
"""
Discretize this operator.
By default this just sets the self.discretized flag to True.
Once this flag is set one may call ComputationalGraphNode.get_work_properties() and
ComputationalGraphNode.setup().
"""
self.discretized = True
[docs]
@discretized
def get_output_discrete_field(self, field):
"""
Helper function to get a discretized output field:
*Get a DiscreteScalarField from an output ScalarField.
*Get a DiscreteTensorField from an output TensorField.
"""
check_instance(field, Field)
if self.output_discrete_fields is None:
msg = "{}(name={}) \n => Discretization did not set self.output_discrete_fields."
msg = msg.format(self.full_tag, self.name)
raise RuntimeError(msg)
if self.output_discrete_tensor_fields is None:
msg = "{}(name={}) \n => Discretization did not set self.output_discrete_tensor_fields."
msg = msg.format(self.full_tag, self.name)
raise RuntimeError(msg)
if field.is_tensor:
if field not in self.output_tensor_fields:
msg = "{} is not a registered output TensorField for graph node:\n{}"
msg = msg.format(field.short_description(), self.long_description())
raise RuntimeError(msg)
return self.output_discrete_tensor_fields[field]
else:
if field not in self.output_fields:
msg = "{} is not a registered output ScalarField for graph node:\n{}"
msg = msg.format(field.short_description(), self.long_description())
raise RuntimeError(msg)
return self.output_discrete_fields[field]
[docs]
@base_initialized
def iter_output_fields(
self, with_scalars=True, with_tensors=True, as_scalars=False
):
"""
Iterate over all output fields.
By default iterate over all tensors and scalars unless
only_scalars or only_tensors is specified.
as_scalars will ravel all tensors to scalars.
"""
assert with_scalars or with_tensors, "iterating over nothing"
output_scalar_fields_from_tensors = {
field for tfield in self.output_tensor_fields for field in tfield.fields
}
if with_tensors and (not as_scalars):
yield from self.output_tensor_fields
for field in self.output_fields:
if field in output_scalar_fields_from_tensors:
# field is contained in a tensor field
if with_tensors and as_scalars:
yield field
else:
# field is not contained in any tensor field
if with_scalars:
yield field
[docs]
@discretized
def iter_output_discrete_fields(
self, with_scalars=True, with_tensors=True, as_scalars=False
):
"""
Iterate over all output (field, discrete_field) pairs.
By default iterate over all tensors and scalars unless
only_scalars or only_tensors is specified.
as_scalars will ravel all tensors to scalars.
"""
assert with_scalars or with_tensors, "iterating over nothing"
output_scalar_fields_from_tensors = {
field for tfield in self.output_tensor_fields for field in tfield.fields
}
if with_tensors and (not as_scalars):
yield from self.output_discrete_tensor_fields.items()
for field, dfield in self.output_discrete_fields.items():
if field in output_scalar_fields_from_tensors:
# field is contained in a tensor field
if with_tensors and as_scalars:
yield (field, dfield)
else:
# field is not contained in any tensor field
if with_scalars:
yield (field, dfield)
[docs]
@debug
@discretized
def get_work_properties(self):
"""
Returns extra memory requirements of this node.
This allows operators to request for temporary buffers
that will be shared between operators in a graph to reduce
the memory footprint and the number of allocations.
By default this returns None, meanning that this node requires
no extra buffers.
"""
return None
[docs]
@debug
@discretized
def setup(self, work):
"""
Setup temporary buffer that have been requested in get_work_properties().
This function may be used to execute post allocation routines.
This sets self.ready flag to True.
Once this flag is set one may call ComputationalGraphNode.apply() and
ComputationalGraphNode.finalize().
"""
self.ready = True
[docs]
@abstractmethod
@ready
def apply(self, simulation=None, **kwds):
"""
Abstract method that should be implemented.
Applies this node (operator, computational graph operator...).
"""
pass
[docs]
@debug
@ready
def finalize(self, **kwds):
"""
Cleanup this node (free memory from external solvers, ...)
By default, this does nothing
"""
self.ready = False
[docs]
def dump_outputs(
self,
fields=None,
io_params=None,
filename=None,
frequency=None,
fileformat=None,
io_leader=None,
**op_kwds,
):
"""
Tell this operator to dump some of its outputs after
apply is called.
Target folder, file, dump frequency and other io pameters
are passed trough instance io_params of this parameter or
as keywords.
"""
from hysop.core.graph.computational_graph import ComputationalGraph
if fields is not None:
if isinstance(fields, Field):
fields = (fields,)
check_instance(fields, (set, list, tuple), values=Field)
if self._base_initialized:
for field in fields:
if (field not in self.output_fields) and (
field not in self.output_tensor_fields
):
msg = "Field {} is not an output field of operator {}."
msg = msg.format(field.name, self.name)
raise RuntimeError(msg)
else:
fields = self.all_output_fields
fields = list(sorted(fields, key=lambda f: f.name))
if io_params is None:
io_params = self.io_params
if io_params is None:
msg = f"io_params was never set for operator {self.name}."
raise RuntimeError(msg)
frequency = first_not_None(frequency, io_params.frequency)
fileformat = first_not_None(fileformat, io_params.fileformat)
io_leader = first_not_None(io_leader, io_params.io_leader)
if filename is not None:
pass
elif fields is None:
filename = f"{io_params.filename}_out"
else:
filename = "{}_{}".format(
io_params.filename, "_".join(f"{f.name}out" for f in fields)
)
io_params = IOParams(
filename=filename,
frequency=frequency,
fileformat=fileformat,
io_leader=io_leader,
)
self._output_fields_to_dump.append((fields, io_params, op_kwds))
@property
def node_tag(self):
"""
Return a tag of this node as a string.
Cannot be used to differenciate nodes as
some operators may have the same name.
"""
return f"{self.__class__.__name__}::{self.name}"
[docs]
def long_description(self):
sep = "\n *"
ss = "{}[name={}, pname:{}]"
ss = ss.format(self.full_tag, self.name, self.pretty_name)
ss += "\n INPUT FIELDS:{}"
if self.input_fields:
ss = ss.format(
sep + sep.join(f.short_description() for f in self.input_fields.keys())
)
else:
ss = ss.format(" None")
ss += "\n OUTPUT FIELDS:{}"
if self.output_fields:
ss = ss.format(
sep + sep.join(f.short_description() for f in self.output_fields.keys())
)
else:
ss = ss.format(" None")
ss += "\n INPUT TENSOR FIELDS:{}"
if self.input_tensor_fields:
ss = ss.format(
sep + sep.join(f.short_description() for f in self.input_tensor_fields)
)
else:
ss = ss.format(" None")
ss += "\n OUTPUT TENSOR FIELDS:{}"
if self.output_tensor_fields:
ss = ss.format(
sep + sep.join(f.short_description() for f in self.output_tensor_fields)
)
else:
ss = ss.format(" None")
ss += "\n INPUT PARAMS:{}"
if self.input_params:
ss = ss.format(
sep + sep.join(f.short_description() for f in self.input_params.keys())
)
else:
ss = ss.format(" None")
ss += "\n OUTPUT PARAMS:{}"
if self.output_params:
ss = ss.format(
sep + sep.join(f.short_description() for f in self.output_params.keys())
)
else:
ss = ss.format(" None")
return ss